#!/gscratch/comdata/modules/sw/anaconda/2019.10/envs/minimal_ds/bin/python3

## the purpose of this script is:
## -- to associate all revids and their associated traits with the article id
## 
## Assumptions:
## already have wikiq revisions data, view data, quality data.

""" 

(0) Per-user configurations and argument parsing

(1) Read in sample 

(2) Reads in all the revisions data (wikiq output)

(3) Outputs revisions info on the sample

(4) Reads in all the view data 

(5) Reads in all the quality data 

(6) Joins view and quality data 

(7) Outputs view-quality data

"""


#!/usr/bin/env python3
import sys
# add pyspark to python path


sys.path.append("/com/local/spark/bin/python/pyspark")
sys.path.append("/com/local/spark/python")


from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.window import Window
from pyspark.sql import types
from pyspark.sql.types import *

import os
import pyspark.sql.functions as f
import numpy as np
import argparse
import glob
import datetime
import urllib
#from pyspark.sql.functions import udf
#from pyspark.sql.functions import * 
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StructType
import re

#for the sake of sanity, not passing these in as a flag
#viewDataList = ['/gscratch/comdata/users/kaylea/views/pre2018ViewData/*', '/gscratch/comdata/users/kaylea/views/post2018ViewData/**/*', '/gscratch/comdata/users/kaylea/views/fixup092021/*/*']
viewDataList = [ '/gscratch/comdata/users/kaylea/views/2015to2018/**/*', '/gscratch/comdata/users/kaylea/views/2019to2021/**/*', '/gscratch/comdata/users/kaylea/views/through2015/**/*']

#qualityPath = '/gscratch/comdata/users/kaylea/anon_underprod/data/cleanQuality/*'
#qualityPath = '/gscratch/comdata/users/kaylea/taboo/processed_data/quality/*'
#qualityPath = '/gscratch/comdata/users/kaylea/taboo/processed_data/cleanQuality/*'
qualityPath = '/gscratch/comdata/users/kaylea/quality/processed_data/*'

#revDataPath = '/gscratch/comdata/output/wikiq_enwiki_20200301_nathante.parquet'
# the post processed version has various sliding-window calculations that I like even more
#revDataPath = '/gscratch/comdata/users/kaylea/anon_underprod/data/postproc_wikiq_enwiki_20200301_nathante.parquet'
revDataPath = '/gscratch/comdata/users/kaylea/taboo/processed_data/postproc_wikiqKHC_enwiki_202109'

salientData = '/gscratch/comdata/users/kaylea/taboo/processed_data/grammifiedData/salientArticles.tsv'


##todo -- do I need this?
def makeYearMonth(timestamp):
    timestamp = str(timestamp)
    m = timestamp[4:6]
    y = timestamp[0:4]
    ydashm = str(y + '-' + m)
    print >>sys.stderr, ydashm
    return ydashm


##todo -- do I need this?
def monthYearToYearDashMonth(m, y):
    if m is None:
        print("Found a None ", m)
        return("null")

    else:
        if int(m) < 10: #make sure there's only one leading zero
            m = "0" + str(int(m)) #can't just use format??? pyspark.....!!!1
        else:
            m = str(m)
          
        return(str(y) + "-" + str(m)) 



def parse_args():

    parser = argparse.ArgumentParser(description='Digs up revision, quality, and view data for a sample.')
    parser.add_argument('-o', '--output_dir', help='Output directory', type=str) #no default because two very different sample inputs are possible
    parser.add_argument('-i', '--input_dir', help='Input directory', type=str) #no default because two very different sample inputs are possible
    parser.add_argument('--format', help = "[tsv, parquet] format to output",type=str, default='tsv')
    args = parser.parse_args()

    return(args)


def main():

#(0) Per-user configurations and argument parsing

    print(sys.argv)
    args = parse_args()
    print(args)

    # handle -i
    sampleData = args.input_dir

    # handle -o
    outputPath = os.path.join(args.output_dir, '')
    if not os.path.exists(outputPath):
        os.makedirs(outputPath)

    conf = SparkConf().setAppName("joining quality, revision, and view data to a sample")
    spark = SparkSession.builder.getOrCreate()


#(1) Read in sample 
    sparkReader = spark.read
    sampleDF = sparkReader.csv(sampleData, sep='\t', inferSchema=True, header=True, mode="PERMISSIVE", quote='"')
    #sampleDF.show(11)
    #sampleDF = sampleDF.drop('postProcFirstX')
    #sampleDF = sampleDF.withColumnRenamed('articleID.x', 'articleID')
    #sampleDF.show(11)


#####don't need
#(2) create targets DF based on sample. the problem we have is that we lack articleIDs for targets.
    #targetDF = sampleDF.filter(sampleDF.encodedTitle != sampleDF.target) #it's a redirect
    #targetDF.encodedTitle = targetDF.target #shift to target
    #unencode_udf = f.udf(lambda newTitle: urllib.parse.unquote(newTitle)) 
    #targetDF.title = unencode_udf(targetDF.encodedTitle)
    #targetDF.show(15) #need to check if these columns are all really what we want

    #sampleDF.printSchema()
    #targetDF.printSchema()

    #sampleDF = sampleDF.union(targetDF)
    #sampleDF = sampleDF.dropDuplicates()
    sampleDF.show(15)
    sampleDF.printSchema()

#(2) Reads in all the revisions data (wikiq output)
    sparkReader = spark.read
    wikiqDF = sparkReader.parquet(revDataPath)
    #wikiqDF = sparkReader.csv(revDataPath, sep='\t', inferSchema=True, header=True, mode='PERMISSIVE', quote='"')
    #wikiqDF.show(11)
    #wikiqDF.printSchema()
    #wikiqDF = wikiqDF.withColumnRenamed('articleid', 'articleID')
    wikiqDF = wikiqDF.withColumnRenamed('title', 'encodedTitle')
    wikiqDF = wikiqDF.withColumn('reverteds', f.regexp_replace('reverteds', '"', '')) #clean out the weird quotes that otherwise bleed into the TSV
    wikiqDF.show(11)
    #wikiqDF = wikiqDF.drop('title') 
    wikiqDF = wikiqDF.drop('articleid')
    wikiqDF.printSchema()
    sampleDF.printSchema()

    revDF = sampleDF.join(wikiqDF, ['encodedTitle'], how='left') # everything in sample + everything that matches in wikiq output
    revDF.printSchema()

    revDF.show(8)
    revDF = revDF.drop('minor')
    revDF = revDF.drop('namespace')
    revDF = revDF.drop('sha1')
    revDF.show(8)


##eliminate cols I don't need:
#articleID       encodedTitle    prediction      articlePred     revid   anon    collapsed_revs  date_time       deleted editor  editor_id       minor   namespace revert  reverteds       sha1    text_chars      title   editor_id_or_ip reverted_by     editor_nth_revert_action        editor_nth_revert       article_nth_revert        year    month   editor_nth_edit_nocollapse      article_nth_edit_nocollapse     editorPlain     editorUserpage  userpage_date_time userpage_namespace      userpage_text_chars     userpage_title  delta   minDelta

#(3) Outputs revisions info on the sample


    revDF = revDF.dropDuplicates() # drop Duplicates wants all columns listed? is this how we got duplicate columns?
    #revDF = revDF.distinct() #be kind rewind
    revDF.printSchema()
    revDF = revDF.repartition(100)
    revDF.printSchema()


    if not os.path.exists(outputPath + "revData"):
        os.makedirs(outputPath + "revData")
    if args.format == "tsv":
        revDF.write.csv(outputPath + "revData", sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")
    else:
        revDF.write.parquet(outputPath + "revData", mode='overwrite')



if __name__ == "__main__":

    main()

